{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Feast Basic Customer Transactions Example"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "This is a minimal example of using Feast. The point is to show users how to get data into Feast and how to retrieve features for online serving and model training.\n",
    "\n",
    "In this example we will\n",
    "1. Create a synthetic customer feature dataset\n",
    "2. Register a feature set to represent these features in Feast\n",
    "3. Ingest these features into Feast\n",
    "4. Create a feature query and retrieve online feature data\n",
    "5. Create a feature query and retrieve historical feature data"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 0. Configuration"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "import os\n",
    "\n",
    "# Feast Core acts as the central feature registry\n",
    "FEAST_CORE_URL = os.getenv('FEAST_CORE_URL', 'localhost:6565')\n",
    "\n",
    "# Feast Online Serving allows for the retrieval of real-time feature data\n",
    "FEAST_ONLINE_SERVING_URL = os.getenv('FEAST_ONLINE_SERVING_URL', 'localhost:6566')\n",
    "\n",
    "# Feast Batch Serving allows for the retrieval of historical feature data\n",
    "FEAST_BATCH_SERVING_URL = os.getenv('FEAST_BATCH_SERVING_URL', 'localhost:6567')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 1. Install Feast SDK"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Install from PyPi"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!pip install feast"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 2. Import necessary modules"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "import pandas as pd\n",
    "import numpy as np\n",
    "from pytz import timezone, utc\n",
    "from feast import Client, FeatureSet, Entity, ValueType\n",
    "from feast.serving.ServingService_pb2 import GetOnlineFeaturesRequest\n",
    "from feast.types.Value_pb2 import Value as Value\n",
    "from google.protobuf.duration_pb2 import Duration\n",
    "from datetime import datetime, timedelta\n",
    "from random import randrange\n",
    "import random"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 3. Configure Feast services and connect the Feast client\n",
    "\n",
    "Connect to Feast Core and Feast Online Serving"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "client = Client(core_url=FEAST_CORE_URL, serving_url=FEAST_ONLINE_SERVING_URL)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 4. Create customer features"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Here we will create customer features for 5 customers over a month. Each customer will have a set of features for every day."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [],
   "source": [
    "days = [datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0).replace(tzinfo=utc) \\\n",
    "        - timedelta(day) for day in range(10)][::-1]\n",
    "\n",
    "customers = [1001, 1002, 1003, 1004, 1005]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "                    datetime  customer_id  daily_transactions  \\\n",
      "0  2020-06-17 00:00:00+00:00         1001            4.900417   \n",
      "1  2020-06-17 00:00:00+00:00         1002            7.440329   \n",
      "2  2020-06-17 00:00:00+00:00         1003            4.224760   \n",
      "3  2020-06-17 00:00:00+00:00         1004            5.482722   \n",
      "4  2020-06-17 00:00:00+00:00         1005            2.200896   \n",
      "5  2020-06-18 00:00:00+00:00         1001            8.173628   \n",
      "6  2020-06-18 00:00:00+00:00         1002            3.164327   \n",
      "7  2020-06-18 00:00:00+00:00         1003            7.248387   \n",
      "8  2020-06-18 00:00:00+00:00         1004            9.274397   \n",
      "9  2020-06-18 00:00:00+00:00         1005            7.846449   \n",
      "10 2020-06-19 00:00:00+00:00         1001            9.028874   \n",
      "11 2020-06-19 00:00:00+00:00         1002            5.140390   \n",
      "12 2020-06-19 00:00:00+00:00         1003            4.537877   \n",
      "13 2020-06-19 00:00:00+00:00         1004            6.797491   \n",
      "14 2020-06-19 00:00:00+00:00         1005            8.234574   \n",
      "15 2020-06-20 00:00:00+00:00         1001            8.319164   \n",
      "16 2020-06-20 00:00:00+00:00         1002            7.158817   \n",
      "17 2020-06-20 00:00:00+00:00         1003            4.920308   \n",
      "18 2020-06-20 00:00:00+00:00         1004            7.974404   \n",
      "19 2020-06-20 00:00:00+00:00         1005            2.298012   \n",
      "\n",
      "    total_transactions  \n",
      "0                   45  \n",
      "1                   77  \n",
      "2                    8  \n",
      "3                   40  \n",
      "4                   53  \n",
      "5                   33  \n",
      "6                   93  \n",
      "7                   68  \n",
      "8                   53  \n",
      "9                   11  \n",
      "10                  19  \n",
      "11                   2  \n",
      "12                   1  \n",
      "13                  59  \n",
      "14                  95  \n",
      "15                  37  \n",
      "16                  93  \n",
      "17                  73  \n",
      "18                  46  \n",
      "19                  12  \n"
     ]
    }
   ],
   "source": [
    "customer_features = pd.DataFrame(\n",
    "    {\n",
    "        \"datetime\": [day for day in days for customer in customers], # Datetime is required\n",
    "        \"customer_id\": [customer for day in days for customer in customers], # Customer is the entity\n",
    "        \"daily_transactions\": [np.random.rand() * 10 for _ in range(len(days) * len(customers))], # Feature 1\n",
    "        \"total_transactions\": [np.random.randint(100) for _ in range(len(days) * len(customers))], # Feature 2\n",
    "    }\n",
    ")\n",
    "\n",
    "print(customer_features.head(20))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 5. Create feature set for customer features"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now we will create a feature set for these features. Feature sets are essentially a schema that represent\n",
    "feature values. Feature sets allow Feast to both identify feature values and their structure. The following feature set contains no features yet."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [],
   "source": [
    "customer_fs = FeatureSet(\n",
    "    \"customer_transactions\",\n",
    "    entities=[Entity(name='customer_id', dtype=ValueType.INT64)]\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Here we are automatically inferring the schema from the provided dataset. The two features from the dataset will be added to the feature set"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Feature daily_transactions (ValueType.DOUBLE) added from dataframe.\n",
      "Feature total_transactions (ValueType.INT64) added from dataframe.\n",
      "\n"
     ]
    }
   ],
   "source": [
    "customer_fs.infer_fields_from_df(customer_features, replace_existing_features=True)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 6. Register feature set with Feast Core"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The apply() method will register the provided feature set with Feast Core (the feature registry)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Feature set created: \"customer_transactions\"\n"
     ]
    }
   ],
   "source": [
    "client.apply(customer_fs)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "{\n",
      "  \"spec\": {\n",
      "    \"name\": \"customer_transactions\",\n",
      "    \"entities\": [\n",
      "      {\n",
      "        \"name\": \"customer_id\",\n",
      "        \"valueType\": \"INT64\"\n",
      "      }\n",
      "    ],\n",
      "    \"features\": [\n",
      "      {\n",
      "        \"name\": \"daily_transactions\",\n",
      "        \"valueType\": \"DOUBLE\"\n",
      "      },\n",
      "      {\n",
      "        \"name\": \"total_transactions\",\n",
      "        \"valueType\": \"INT64\"\n",
      "      }\n",
      "    ],\n",
      "    \"source\": {\n",
      "      \"type\": \"KAFKA\",\n",
      "      \"kafkaSourceConfig\": {\n",
      "        \"bootstrapServers\": \"localhost:9094\",\n",
      "        \"topic\": \"feast-features\"\n",
      "      }\n",
      "    },\n",
      "    \"project\": \"default\"\n",
      "  },\n",
      "  \"meta\": {\n",
      "    \"createdTimestamp\": \"2020-06-26T12:27:17Z\",\n",
      "    \"status\": \"STATUS_PENDING\"\n",
      "  }\n",
      "}\n"
     ]
    }
   ],
   "source": [
    "customer_fs = client.get_feature_set(\"customer_transactions\")\n",
    "print(customer_fs)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 7. Ingest data into Feast for a feature set"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Next we will ingest/load data into Feast. This process populates all registered stores (BigQuery, Redis) with your feature data."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Waiting for feature set to be ready for ingestion...\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "100%|██████████| 50/50 [00:01<00:00, 47.23rows/s]"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Ingestion complete!\n",
      "\n",
      "Ingestion statistics:\n",
      "Success: 50/50\n",
      "Removing temporary file(s)...\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "'5e650050-f41d-39fc-bc56-d602c4a478d2'"
      ]
     },
     "execution_count": 10,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "client.ingest(\"customer_transactions\", customer_features)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 8. Retrieve online features"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The example below retrieves online features for a single customer: \"1001\". Retrieval of features is not limited to a single feature set. Users can provide any features as long as they are present on the provided entities."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[fields {\n",
      "  key: \"customer_id\"\n",
      "  value {\n",
      "    int64_val: 1001\n",
      "  }\n",
      "}\n",
      "fields {\n",
      "  key: \"daily_transactions\"\n",
      "  value {\n",
      "    double_val: 0.12021977894872915\n",
      "  }\n",
      "}\n",
      "fields {\n",
      "  key: \"total_transactions\"\n",
      "  value {\n",
      "    int64_val: 0\n",
      "  }\n",
      "}\n",
      "statuses {\n",
      "  key: \"customer_id\"\n",
      "  value: PRESENT\n",
      "}\n",
      "statuses {\n",
      "  key: \"daily_transactions\"\n",
      "  value: PRESENT\n",
      "}\n",
      "statuses {\n",
      "  key: \"total_transactions\"\n",
      "  value: PRESENT\n",
      "}\n",
      "]\n"
     ]
    }
   ],
   "source": [
    "online_features = client.get_online_features(\n",
    "    feature_refs=[\n",
    "        f\"daily_transactions\",\n",
    "        f\"total_transactions\",\n",
    "    ],\n",
    "    entity_rows=[\n",
    "        {\n",
    "            \"customer_id\": 1001\n",
    "        }\n",
    "    ],\n",
    ")\n",
    "print(online_features.field_values)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "To retrieve the fields in dictionary format, we can utilize `to_dict()` method."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "{'daily_transactions': [0.12021977894872915],\n",
       " 'total_transactions': [0],\n",
       " 'customer_id': [1001]}"
      ]
     },
     "execution_count": 12,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "online_features.to_dict()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    " "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 9. Retrieve training features"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "(Requires Google Cloud Platform)\n",
    "\n",
    "In order to retrieve historical feature data, the user must provide an `entity_rows` DataFrame. This DataFrame contains a combination of timestamps and entities (customers, in this case).\n",
    "\n",
    "The timestamps correlate to the event_time that a prediction needs to be made. At each one of these points in time you need to know the \"current\" feature values.\n",
    "\n",
    "We will randomly generate timestamps over the last 5 days and assign `customer_ids` to them.\n",
    "\n",
    "When these entity rows are sent to the Feast Serving API to retrieve feature values, along with a list of feature ids, Feast is then able to attach the correct feature values to each entity row. It will join the correct feature values at each point in time for each entity onto these entity rows."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "                          datetime  customer_id\n",
      "0 2020-06-15 10:35:10.918716+00:00         1001\n",
      "1 2020-06-15 14:00:10.918758+00:00         1002\n",
      "2 2020-06-17 08:59:10.918767+00:00         1003\n",
      "3 2020-06-13 16:51:10.918774+00:00         1004\n",
      "4 2020-06-17 06:14:10.918780+00:00         1005\n",
      "5 2020-06-17 14:33:10.918786+00:00         1001\n",
      "6 2020-06-14 23:15:10.918792+00:00         1002\n",
      "7 2020-06-15 11:25:10.918798+00:00         1003\n",
      "8 2020-06-18 09:04:10.918804+00:00         1004\n",
      "9 2020-06-16 10:27:10.918810+00:00         1005\n"
     ]
    }
   ],
   "source": [
    "event_timestamps = [datetime.utcnow().replace(tzinfo=utc) - timedelta(days=randrange(5), hours=randrange(24), minutes=randrange(60)) for day in range(30)]\n",
    "\n",
    "entity_rows = pd.DataFrame(\n",
    "    {\n",
    "        \"datetime\": event_timestamps,\n",
    "        \"customer_id\": [customers[idx % len(customers)] for idx in range(len(event_timestamps))],\n",
    "    }\n",
    ")\n",
    "\n",
    "print(entity_rows.head(10))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 10. Retrieve historical/batch features"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Next we will create a new client object, but this time we will configure it to connect to the Feast Batch Serving. This service will allow us to retrieve historical feature data."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "metadata": {},
   "outputs": [],
   "source": [
    "batch_client = Client(core_url=FEAST_CORE_URL, serving_url=FEAST_BATCH_SERVING_URL)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "By calling the `get_historical_features` method we are able to retrieve a `job` object. This object can be used to retrieve the resulting training dataset that is exported by Feast. \n",
    "\n",
    "The dataset that is returned will contain feature values for each entity and timestamp combination in `entity_rows`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "metadata": {
    "scrolled": true
   },
   "outputs": [],
   "source": [
    "job = batch_client.get_historical_features(\n",
    "                            feature_refs=[\n",
    "                                f\"daily_transactions\", \n",
    "                                f\"total_transactions\", \n",
    "                               ],\n",
    "                            entity_rows=entity_rows\n",
    "                         )"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Once the job is complete, it is possible to retrieve the exported data (from Google Cloud Storage) and load it into memory as a Pandas Dataframe."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "metadata": {},
   "outputs": [],
   "source": [
    "df = job.to_dataframe()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "                    event_timestamp  customer_id  daily_transactions  \\\n",
      "0  2020-06-13 23:45:10.918874+00:00         1001            1.879220   \n",
      "1  2020-06-18 12:11:10.918845+00:00         1001            5.122846   \n",
      "2  2020-06-17 20:46:10.918903+00:00         1001            2.145294   \n",
      "3  2020-06-18 02:50:10.918816+00:00         1001            5.122846   \n",
      "4  2020-06-15 10:35:10.918716+00:00         1001            5.758472   \n",
      "5  2020-06-17 14:33:10.918786+00:00         1001            2.145294   \n",
      "6  2020-06-14 23:15:10.918792+00:00         1002            5.467141   \n",
      "7  2020-06-14 07:22:10.918851+00:00         1002            5.467141   \n",
      "8  2020-06-17 23:40:10.918880+00:00         1002            3.338614   \n",
      "9  2020-06-15 14:00:10.918758+00:00         1002            4.921264   \n",
      "10 2020-06-15 18:22:10.918909+00:00         1002            4.921264   \n",
      "11 2020-06-16 21:10:10.918822+00:00         1002            1.838186   \n",
      "12 2020-06-18 05:47:10.918886+00:00         1003            2.702916   \n",
      "13 2020-06-17 08:59:10.918767+00:00         1003            0.211125   \n",
      "14 2020-06-15 11:25:10.918798+00:00         1003            4.476252   \n",
      "15 2020-06-16 09:56:10.918857+00:00         1003            9.123597   \n",
      "16 2020-06-14 11:39:10.918915+00:00         1003            6.353373   \n",
      "17 2020-06-15 03:21:10.918828+00:00         1003            4.476252   \n",
      "18 2020-06-18 09:04:10.918804+00:00         1004            8.756623   \n",
      "19 2020-06-14 14:18:10.918834+00:00         1004            8.647374   \n",
      "20 2020-06-17 03:10:10.918863+00:00         1004            2.377199   \n",
      "21 2020-06-13 16:51:10.918774+00:00         1004            6.362085   \n",
      "22 2020-06-15 03:54:10.918892+00:00         1004            8.235070   \n",
      "23 2020-06-17 19:01:10.918921+00:00         1004            2.377199   \n",
      "24 2020-06-17 06:14:10.918780+00:00         1005            9.958688   \n",
      "25 2020-06-16 08:23:10.918839+00:00         1005            0.006388   \n",
      "26 2020-06-16 00:30:10.918927+00:00         1005            0.006388   \n",
      "27 2020-06-16 10:27:10.918810+00:00         1005            0.006388   \n",
      "28 2020-06-17 01:50:10.918869+00:00         1005            9.958688   \n",
      "29 2020-06-17 08:42:10.918897+00:00         1005            9.958688   \n",
      "\n",
      "    total_transactions  \n",
      "0                    7  \n",
      "1                   96  \n",
      "2                   63  \n",
      "3                   96  \n",
      "4                   85  \n",
      "5                   63  \n",
      "6                   10  \n",
      "7                   10  \n",
      "8                   50  \n",
      "9                   55  \n",
      "10                  55  \n",
      "11                  83  \n",
      "12                  50  \n",
      "13                  96  \n",
      "14                  61  \n",
      "15                  85  \n",
      "16                  69  \n",
      "17                  61  \n",
      "18                  84  \n",
      "19                  95  \n",
      "20                  25  \n",
      "21                   2  \n",
      "22                  58  \n",
      "23                  25  \n",
      "24                   6  \n",
      "25                  36  \n",
      "26                  36  \n",
      "27                  36  \n",
      "28                   6  \n",
      "29                   6  \n"
     ]
    }
   ],
   "source": [
    "print(df.head(50))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The training dataset is staged on Google Cloud Storage and can be accessed directly if it is too large to load into memory"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "job.get_avro_files()"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "local-feast",
   "language": "python",
   "name": "local-feast"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.6"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
